Cluster Linking是Confluent Platform提供的一种功能,用于将多个Kafka集群连接在一起。该功能允许不同的Kafka集群之间进行数据的镜像和复制。Cluster Linking将在数据目标(Destination)集群启动,并复制数据源(Source)集群的数据到目标集群。本文将向您介绍如何使用云消息队列 Confluent 版的Cluster Linking。主要包括如何远程使用Confluent CLI客户端方式创建Cluster Linking以及Cluster Linking的基本管理。
前提条件
已经准备好数据源集群和数据目标集群。
已经准备好用于连接Source集群与Destination集群的机器。本文以ECS为例,实例创建和使用,请参见通过控制台使用ECS实例(快捷版)。
配置文件
在ECS实例中新建配置文件,用于连接Source集群与Destination集群。请将下文注意示例代码中的<username>
、<password>
、<source-cluster-address:port>
替换为您本地的配置。
创建连接Source集群的配置文件
/tmp/source.config
,并按照以下配置打开自动创建Mirror Topic、同步Consumer Group、同步ACL用户开关。security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; bootstrap.servers=<source-cluster-address:port> auto.create.mirror.topics.enable=true consumer.offset.sync.enable=true acl.sync.enable=true
创建连接Destination集群的配置文件
/tmp/destination.config
。security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
测试数据准备
使用Confluent Platform CLI执行相关命令,在Source集群上准备测试数据。请将示例中的<source-cluster-address:port>
替换为您本地的配置。
执行以下命令,在Source集群上创建一个具有单分区的待镜像的Topic,以便更容易观察复制消息的顺序。
kafka-topics --create --topic test-topic --partitions 1 \ --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config
使用
list topic
和describe topic
命令查看Topic详情。#list topic kafka-topics --list --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config #describe topic kafka-topics --describe --topic test-topic \ --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config
执行以下命令,向Source集群上的test-topic发送消息。
seq 1 5 | kafka-console-producer --topic test-topic \ --bootstrap-server <source-cluster-address:port> \ --producer.config /tmp/source.config
消费Source集群中test-topic上的数据并指定Consumer Group。
# consume kafka-console-consumer --topic test-topic ---beginning \ --bootstrap-server <source-cluster-address:port> --group test-group \ --consumer.config /tmp/source.config # list consumer groups kafka-consumer-groups --bootstrap-server <source-cluster-address:port> --list \ --command-config /tmp/source.config # describe offsets of consumer groups kafka-consumer-groups --bootstrap-server <source-cluster-address:port> \ --group test-group --describe --offsets \ --command-config /tmp/source.config
如果成功消费消息,您的输出将是:
1
2
3
4
5
新增ACL用户并赋予可写权限。
# add user and write permission kafka-acls --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config --add --allow-principal User:test-user \ --operation READ --topic test-topic # list kafka-acls --list --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config
数据同步
本示例假设Source集群和Destination集群需要用SASL_SSL的方式登录集群,并且假设连接集群的时候,使用了证书进行域名校验,请注意替换以下示例代码中的<source-cluster-address:port>
、<destination-cluster-address:port>
。
创建选择迁移Topic的配置文件
/tmp/topic_filter.json
。{ "topicFilters": [ { "name": "test-topic", "patternType": "LITERAL", "filterType": "INCLUDE" } ] }
创建选择迁移消费者组的配置文件
/tmp/group.json
。{ "groupFilters": [ { "name": "test-group", "patternType": "LITERAL", "filterType": "INCLUDE" } ] }
创建选择迁移ACL权限的配置文件
/tmp/acl.json
。{ "aclFilters": [ { "resourceFilter": { "resourceType": "any", "patternType": "any" }, "accessFilter": { "operation": "any", "permissionType": "any" } } ] }
创建Cluster Linking并复制Topics、Consumer Groups、ACL用户权限。
kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config --create --link test-cluster-link \ --config-file /tmp/source.config \ --topic-filters-json-file /tmp/topic_filter.json \ --consumer-group-filters-json-file /tmp/group.json \ --acl-filters-json-file /tmp/acl.json
待数据同步完成后,修改Mirror Topic的状态为
promote
,使Mirror Topic可读写。此时,Mirror Topic不再从源Topic同步消息。kafka-mirrors --promote --topics test-topic \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
迁移测试
完成上述数据同步的步骤后,可以通过以下操作验证迁移是否成功。
查看Destination集群Topic、Consumer Group、ACL用户权限是否同步。
# list topic kafka-topics --list --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config # list consumer group kafka-consumer-groups --bootstrap-server <destination-cluster-address:port> \ --list --command-config /tmp/destination.config # list acl kafka-acls --list --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
验证Topic生产、消费是否正常。
# produce kafka-console-producer --topic test-topic \ --bootstrap-server <destination-cluster-address:port> \ --producer.config /tmp/destination.config # consume kafka-console-consumer --topic test-topic \ --bootstrap-server <destination-cluster-address:port> \ --consumer.config /tmp/destination.config
Cluster Linking管理
本节描述如何管理已创建的Cluster Linking。请您注意替换示例代码中的<destination-cluster-address:port>
。
执行以下命令,查看Cluster Linking列表。
kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \ --list --command-config /tmp/destination.config
执行以下命令,查看Cluster Linking详情。
kafka-configs --describe --cluster-link test-cluster-link \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
执行以下命令,将Mirror Topic转换为普通Topic。
kafka-mirrors --promote --topics test-topic \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
预期输出:
Calculating max offset and ms lag for mirror topics: [test-topic] Finished calculating max offset lag and max lag ms for mirror topics: [test-topic] Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
执行以下命令,删除Cluster Linking。
kafka-cluster-links --delete --link test-cluster-link \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
预期输出:
Cluster link 'test-cluster-link' deletion successfully completed.
相关文档
关于Cluster Linking的更多信息,请参见Cluster Linking for Confluent Platform。
查看集群是否可以使用Cluster Linking,请参见支持的集群类型。